1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import static org.junit.Assert.assertEquals;
19  import static org.junit.Assert.fail;
20  import static org.mockito.Matchers.any;
21  import static org.mockito.Matchers.anyInt;
22  import static org.mockito.Matchers.anyString;
23  import static org.mockito.Mockito.*;
24  
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.List;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.atomic.AtomicInteger;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.junit.Before;
33  import org.junit.Test;
34  import org.mockito.MockitoAnnotations;
35  
36  import rx.Observable;
37  import rx.Observer;
38  import rx.Producer;
39  import rx.Subscriber;
40  import rx.functions.Action2;
41  import rx.functions.Func0;
42  import rx.functions.Func1;
43  import rx.functions.Func2;
44  import rx.observers.TestSubscriber;
45  
46  public class OperatorScanTest {
47  
48      @Before
49      public void before() {
50          MockitoAnnotations.initMocks(this);
51      }
52  
53      @Test
54      public void testScanIntegersWithInitialValue() {
55          @SuppressWarnings("unchecked")
56          Observer<String> observer = mock(Observer.class);
57  
58          Observable<Integer> observable = Observable.just(1, 2, 3);
59  
60          Observable<String> m = observable.scan("", new Func2<String, Integer, String>() {
61  
62              @Override
63              public String call(String s, Integer n) {
64                  return s + n.toString();
65              }
66  
67          });
68          m.subscribe(observer);
69  
70          verify(observer, never()).onError(any(Throwable.class));
71          verify(observer, times(1)).onNext("");
72          verify(observer, times(1)).onNext("1");
73          verify(observer, times(1)).onNext("12");
74          verify(observer, times(1)).onNext("123");
75          verify(observer, times(4)).onNext(anyString());
76          verify(observer, times(1)).onCompleted();
77          verify(observer, never()).onError(any(Throwable.class));
78      }
79  
80      @Test
81      public void testScanIntegersWithoutInitialValue() {
82          @SuppressWarnings("unchecked")
83          Observer<Integer> observer = mock(Observer.class);
84  
85          Observable<Integer> observable = Observable.just(1, 2, 3);
86  
87          Observable<Integer> m = observable.scan(new Func2<Integer, Integer, Integer>() {
88  
89              @Override
90              public Integer call(Integer t1, Integer t2) {
91                  return t1 + t2;
92              }
93  
94          });
95          m.subscribe(observer);
96  
97          verify(observer, never()).onError(any(Throwable.class));
98          verify(observer, never()).onNext(0);
99          verify(observer, times(1)).onNext(1);
100         verify(observer, times(1)).onNext(3);
101         verify(observer, times(1)).onNext(6);
102         verify(observer, times(3)).onNext(anyInt());
103         verify(observer, times(1)).onCompleted();
104         verify(observer, never()).onError(any(Throwable.class));
105     }
106 
107     @Test
108     public void testScanIntegersWithoutInitialValueAndOnlyOneValue() {
109         @SuppressWarnings("unchecked")
110         Observer<Integer> observer = mock(Observer.class);
111 
112         Observable<Integer> observable = Observable.just(1);
113 
114         Observable<Integer> m = observable.scan(new Func2<Integer, Integer, Integer>() {
115 
116             @Override
117             public Integer call(Integer t1, Integer t2) {
118                 return t1 + t2;
119             }
120 
121         });
122         m.subscribe(observer);
123 
124         verify(observer, never()).onError(any(Throwable.class));
125         verify(observer, never()).onNext(0);
126         verify(observer, times(1)).onNext(1);
127         verify(observer, times(1)).onNext(anyInt());
128         verify(observer, times(1)).onCompleted();
129         verify(observer, never()).onError(any(Throwable.class));
130     }
131     
132     @Test
133     public void shouldNotEmitUntilAfterSubscription() {
134         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
135         Observable.range(1, 100).scan(0, new Func2<Integer, Integer, Integer>() {
136 
137             @Override
138             public Integer call(Integer t1, Integer t2) {
139                 return t1 + t2;
140             }
141 
142         }).filter(new Func1<Integer, Boolean>() {
143 
144             @Override
145             public Boolean call(Integer t1) {
146                 // this will cause request(1) when 0 is emitted
147                 return t1 > 0;
148             }
149             
150         }).subscribe(ts);
151         
152         assertEquals(100, ts.getOnNextEvents().size());
153     }
154     
155     @Test
156     public void testBackpressureWithInitialValue() {
157         final AtomicInteger count = new AtomicInteger();
158         Observable.range(1, 100)
159                 .scan(0, new Func2<Integer, Integer, Integer>() {
160 
161                     @Override
162                     public Integer call(Integer t1, Integer t2) {
163                         return t1 + t2;
164                     }
165 
166                 })
167                 .subscribe(new Subscriber<Integer>() {
168 
169                     @Override
170                     public void onStart() {
171                         request(10);
172                     }
173 
174                     @Override
175                     public void onCompleted() {
176 
177                     }
178 
179                     @Override
180                     public void onError(Throwable e) {
181                         fail(e.getMessage());
182                         e.printStackTrace();
183                     }
184 
185                     @Override
186                     public void onNext(Integer t) {
187                         count.incrementAndGet();
188                     }
189 
190                 });
191 
192         // we only expect to receive 10 since we request(10)
193         assertEquals(10, count.get());
194     }
195     
196     @Test
197     public void testBackpressureWithoutInitialValue() {
198         final AtomicInteger count = new AtomicInteger();
199         Observable.range(1, 100)
200                 .scan(new Func2<Integer, Integer, Integer>() {
201 
202                     @Override
203                     public Integer call(Integer t1, Integer t2) {
204                         return t1 + t2;
205                     }
206 
207                 })
208                 .subscribe(new Subscriber<Integer>() {
209 
210                     @Override
211                     public void onStart() {
212                         request(10);
213                     }
214 
215                     @Override
216                     public void onCompleted() {
217 
218                     }
219 
220                     @Override
221                     public void onError(Throwable e) {
222                         fail(e.getMessage());
223                         e.printStackTrace();
224                     }
225 
226                     @Override
227                     public void onNext(Integer t) {
228                         count.incrementAndGet();
229                     }
230 
231                 });
232 
233         // we only expect to receive 10 since we request(10)
234         assertEquals(10, count.get());
235     }
236     
237     @Test
238     public void testNoBackpressureWithInitialValue() {
239         final AtomicInteger count = new AtomicInteger();
240         Observable.range(1, 100)
241                 .scan(0, new Func2<Integer, Integer, Integer>() {
242 
243                     @Override
244                     public Integer call(Integer t1, Integer t2) {
245                         return t1 + t2;
246                     }
247 
248                 })
249                 .subscribe(new Subscriber<Integer>() {
250 
251                     @Override
252                     public void onCompleted() {
253 
254                     }
255 
256                     @Override
257                     public void onError(Throwable e) {
258                         fail(e.getMessage());
259                         e.printStackTrace();
260                     }
261 
262                     @Override
263                     public void onNext(Integer t) {
264                         count.incrementAndGet();
265                     }
266 
267                 });
268 
269         // we only expect to receive 101 as we'll receive all 100 + the initial value
270         assertEquals(101, count.get());
271     }
272 
273     /**
274      * This uses the public API collect which uses scan under the covers.
275      */
276     @Test
277     public void testSeedFactory() {
278         Observable<List<Integer>> o = Observable.range(1, 10)
279                 .collect(new Func0<List<Integer>>() {
280 
281                     @Override
282                     public List<Integer> call() {
283                         return new ArrayList<Integer>();
284                     }
285                     
286                 }, new Action2<List<Integer>, Integer>() {
287 
288                     @Override
289                     public void call(List<Integer> list, Integer t2) {
290                         list.add(t2);
291                     }
292 
293                 }).takeLast(1);
294 
295         assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
296         assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
297     }
298 
299     @Test
300     public void testScanWithRequestOne() {
301         Observable<Integer> o = Observable.just(1, 2).scan(0, new Func2<Integer, Integer, Integer>() {
302 
303             @Override
304             public Integer call(Integer t1, Integer t2) {
305                 return t1 + t2;
306             }
307 
308         }).take(1);
309         TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
310         o.subscribe(subscriber);
311         subscriber.assertReceivedOnNext(Arrays.asList(0));
312         subscriber.assertTerminalEvent();
313         subscriber.assertNoErrors();
314     }
315 
316     @Test
317     public void testScanShouldNotRequestZero() {
318         final AtomicReference<Producer> producer = new AtomicReference<Producer>();
319         Observable<Integer> o = Observable.create(new Observable.OnSubscribe<Integer>() {
320             @Override
321             public void call(final Subscriber<? super Integer> subscriber) {
322                 Producer p = spy(new Producer() {
323 
324                     private AtomicBoolean requested = new AtomicBoolean(false);
325 
326                     @Override
327                     public void request(long n) {
328                         if (requested.compareAndSet(false, true)) {
329                             subscriber.onNext(1);
330                         } else {
331                             subscriber.onCompleted();
332                         }
333                     }
334                 });
335                 producer.set(p);
336                 subscriber.setProducer(p);
337             }
338         }).scan(100, new Func2<Integer, Integer, Integer>() {
339 
340             @Override
341             public Integer call(Integer t1, Integer t2) {
342                 return t1 + t2;
343             }
344 
345         });
346 
347         o.subscribe(new TestSubscriber<Integer>() {
348 
349             @Override
350             public void onStart() {
351                 request(1);
352             }
353 
354             @Override
355             public void onNext(Integer integer) {
356                 request(1);
357             }
358         });
359 
360         verify(producer.get(), never()).request(0);
361         verify(producer.get(), times(2)).request(1);
362     }
363 }